package com.offcn.bigdata.streaming.p1

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * streaming读取数据之从hdfs文件系统中读取
  *   这里只能监听到目录级别——目录中的新增文件，不能监听到文件中的新增数据
  * 说明：这种方式下面没有receiver来接收数据，因此就不需要在多分配线程资源来进行数据读取
  *
  */
object _03StreamingReadDataFromHDFSOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                    .setAppName("_03StreamingReadDataFromHDFSOps")
                    .setMaster("local")
        val batchInterval = Seconds(2)
        val ssc = new StreamingContext(conf, batchInterval)

        //加载hdfs文件内容
//        val lines: DStream[String] = ssc.textFileStream("file:/E:/data/minitor")
        val lines: DStream[String] = ssc.textFileStream("hdfs://ns1/data/spark/monitor/")
        val ret = lines.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)

        ret.print

        ssc.start()
        ssc.awaitTermination()
    }
}
